迭代器
class Fib(object): # 迭代器
def __init__(self, num):
self.num = num
self.a, self.b = 0, 1
self.idx = 0
def __iter__(self):
return self
def __next__(self):
if self.idx < self.num:
self.a, self.b = self.b, self.a + self.b
self.idx += 1
return self.a
raise StopIteration()
a = Fib(1)
b = Fib(5)
c = Fib(10)
print(a)
print(b)
print(c)
生成器
def fib(num): # 生成器
a, b = 0, 1
for _ in range(num):
a, b = b, a + b
yield a
print(fib(1))
print(fib(5))
print(fib(10))
Python 實現併發編程的三種方案:多線程 (Multithreading)、多行程 (Multi-Process) 和異步 I/O。
併發編程的好處在於可以提升執行效率以及改善用戶體驗。
壞處在於併發的程序不容易開發和調試。
import time
import threading
from concurrent.futures import ThreadPoolExecutor
class Account(object): # 銀行帳戶
def __init__(self):
self.balance = 0.0
self.lock = threading.Lock()
def deposit(self, money): # 通過鎖保護臨界資源
with self.lock:
new_balance = self.balance + money
time.sleep(0.001)
self.balance = new_balance
class AddMoneyThread(threading.Thread): # 定義線程 (thread) 類別
def __init__(self, account, money):
self.account = account
self.money = money
# 繼承父類別的初始化, 定義線程 (thread) 的初始化
super().__init__()
def run(self): # 線程 (thread) 啟動後要執行的操作
self.account.deposit(self.money)
def main():
account = Account()
# 創建線程 (thread) 池
pool = ThreadPoolExecutor(max_workers = 10)
futures = []
for _ in range(100):
# 創建線程 (thread) 的第 1 種方式
# threading.Thread( target = account.deposit, args=(1, ) ).start()
# 創建線程 (thread) 的第 2 種方式
# AddMoneyThread(account, 1).start()
# 創建線程 (thread) 的第 3 種方式
# 調用線程 (thread) 池中的線程 (thread) 來執行特定的任務
future = pool.submit(account.deposit, 1)
futures.append(future)
pool.shutdown() # 關閉線程 (thread)池
for future in futures:
future.result()
print(account.balance)
if __name__ == '__main__':
main()
修改前一個的程式,啟動 5 個線程 (thread) 向帳戶中存錢,5 個線程 (thread) 從帳戶中取錢,取錢時如果餘額不足就暫停線程 (thread) 進行等待。為了達到上述目標,需對存錢和取錢的線程 (thread) 進行調度,在餘額不足時取錢的線程 (thread) 暫停並釋放鎖,而存錢的線程 (thread) 將錢存入後要通知取錢的線程 (thread),使其從暫停狀態被喚醒。可以使用 threading 模組的 Condition 來實現調度。
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep
import threading
class Account(): # 銀行帳戶
def __init__(self, balance = 0):
self.balance = balance
lock = threading.Lock()
self.condition = threading.Condition(lock)
def withdraw(self, money): # 取錢
with self.condition:
while money > self.balance:
self.condition.wait()
new_balance = self.balance - money
sleep(0.001)
self.balance = new_balance
def deposit(self, money): # 存錢
with self.condition:
new_balance = self.balance + money
sleep(0.001)
self.balance = new_balance
self.condition.notify_all()
def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name,
':', money, '====>', account.balance)
sleep(0.5)
def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name,
':', money, '<====', account.balance)
sleep(1)
def main():
account = Account()
with ThreadPoolExecutor(max_workers=10) as pool:
for _ in range(5):
pool.submit(add_money, account)
pool.submit(sub_money, account)
if __name__ == '__main__':
main()
這個程式會不斷執行。
import concurrent.futures
import math
PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5
def is_prime(n): # 判斷質數
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
import asyncio
def num_generator(m, n): # 指定範圍的數字生成器
yield from range(m, n + 1)
async def prime_filter(m, n): # 質數過濾器
primes = []
for i in num_generator(m, n):
flag = True
for j in range(2, int(i ** 0.5 + 1)):
if i % j == 0:
flag = False
break
if flag:
print('Prime =>', i)
primes.append(i)
await asyncio.sleep(0.001)
return tuple(primes)
async def square_mapper(m, n): # 平方映射
squares = []
for i in num_generator(m, n):
print('Square =>', i * i)
squares.append(i * i)
await asyncio.sleep(0.001)
return squares
def main():
loop = asyncio.get_event_loop()
future = asyncio.gather(prime_filter(2, 15), square_mapper(1, 100))
future.add_done_callback(lambda x: print(x.result()))
loop.run_until_complete(future)
loop.close()
if __name__ == '__main__':
main()
Python 有一個 aiohttp 的三方庫,它提供了異步的 HTTP 客戶端和服務器,可以跟 asyncio 模組一起工作,並提供對 Future 物件的支持。
下面的代碼異步的從 5 個 URL 中獲取頁面並通過正規表示式的命名提取了網站的標題。
import asyncio
import re
import aiohttp
PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')
async def fetch_page(session, url):
async with session.get(url, ssl=False) as resp:
return await resp.text()
async def show_title(url):
async with aiohttp.ClientSession() as session:
html = await fetch_page(session, url)
print(PATTERN.search(html).group('title'))
def main():
urls = ('https://www.python.org/',
'https://zh.wikipedia.org/',
'https://ithelp.ithome.com.tw/',
'https://www.google.com/',
'https://www.youtube.com/')
loop = asyncio.get_event_loop()
tasks = [show_title(url) for url in urls]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
if __name__ == '__main__':
main()